// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !codes

package testkit

import (
	"context"
	"flag"
	"fmt"
	"os"
	"runtime"
	"strings"
	"sync"
	"sync/atomic"
	"syscall"
	"testing"
	"time"

	"github.com/pingcap/kvproto/pkg/keyspacepb"
	"github.com/pingcap/tidb/pkg/config"
	"github.com/pingcap/tidb/pkg/config/kerneltype"
	"github.com/pingcap/tidb/pkg/ddl"
	"github.com/pingcap/tidb/pkg/ddl/schematracker"
	"github.com/pingcap/tidb/pkg/domain"
	"github.com/pingcap/tidb/pkg/keyspace"
	"github.com/pingcap/tidb/pkg/kv"
	"github.com/pingcap/tidb/pkg/meta/model"
	"github.com/pingcap/tidb/pkg/parser/ast"
	"github.com/pingcap/tidb/pkg/resourcemanager"
	"github.com/pingcap/tidb/pkg/session"
	kvstore "github.com/pingcap/tidb/pkg/store"
	"github.com/pingcap/tidb/pkg/store/driver"
	"github.com/pingcap/tidb/pkg/store/helper"
	"github.com/pingcap/tidb/pkg/store/mockstore"
	tidbutil "github.com/pingcap/tidb/pkg/util"
	"github.com/pingcap/tidb/pkg/util/gctuner"
	"github.com/pingcap/tidb/pkg/util/intest"
	"github.com/pingcap/tidb/pkg/util/logutil"
	"github.com/stretchr/testify/require"
	"go.opencensus.io/stats/view"
	"go.uber.org/zap"
)

// WithTiKV flag is only used for debugging locally with real tikv cluster.
var WithTiKV = flag.String("with-tikv", "", "address of tikv cluster, if set, running test with real tikv cluster")

// TestOption is used to customize a special tk for usage.
type TestOption func(tk *TestKit)

// WithCascades test func body under different planner mode.
func WithCascades(on bool) TestOption {
	return func(tk *TestKit) {
		val := "off"
		if on {
			val = "on"
		}
		tk.MustExec(fmt.Sprintf("set @@tidb_enable_cascades_planner = %s", val))
	}
}

// RunTestUnderCascades run the basic test body among two different planner mode.
func RunTestUnderCascades(t *testing.T, testFunc func(t *testing.T, tk *TestKit, cascades, caller string)) {
	options := []struct {
		name string
		opt  TestOption
	}{
		{"off", WithCascades(false)},
		{"on", WithCascades(true)},
	}
	// get func name
	pc, _, _, ok := runtime.Caller(1)
	require.True(t, ok)
	details := runtime.FuncForPC(pc)
	funcNameIdx := strings.LastIndex(details.Name(), ".")
	funcName := details.Name()[funcNameIdx+1:]
	// iter the options
	for _, val := range options {
		t.Run(val.name, func(t *testing.T) {
			store := CreateMockStore(t)
			tk := NewTestKit(t, store)
			val.opt(tk)
			testFunc(t, tk, val.name, funcName)
		})
	}
}

// RunTestUnderCascadesWithDomain run the basic test body among two different planner mode.
func RunTestUnderCascadesWithDomain(t *testing.T, testFunc func(t *testing.T, tk *TestKit, domain *domain.Domain, cascades, caller string)) {
	options := []struct {
		name string
		opt  TestOption
	}{
		{"off", WithCascades(false)},
		{"on", WithCascades(true)},
	}
	// get func name
	pc, _, _, ok := runtime.Caller(1)
	require.True(t, ok)
	details := runtime.FuncForPC(pc)
	funcNameIdx := strings.LastIndex(details.Name(), ".")
	funcName := details.Name()[funcNameIdx+1:]
	// iter the options
	for _, val := range options {
		t.Run(val.name, func(t *testing.T) {
			store, do := CreateMockStoreAndDomain(t)
			tk := NewTestKit(t, store)
			val.opt(tk)
			testFunc(t, tk, do, val.name, funcName)
		})
	}
}

// RunTestUnderCascadesAndDomainWithSchemaLease runs the basic test body among two different planner modes. It can be used to set schema lease and store options.
func RunTestUnderCascadesAndDomainWithSchemaLease(t *testing.T, lease time.Duration, opts []mockstore.MockTiKVStoreOption, testFunc func(t *testing.T, tk *TestKit, domain *domain.Domain, cascades, caller string)) {
	options := []struct {
		name string
		opt  TestOption
	}{
		{"off", WithCascades(false)},
		{"on", WithCascades(true)},
	}
	// get func name
	pc, _, _, ok := runtime.Caller(1)
	require.True(t, ok)
	details := runtime.FuncForPC(pc)
	funcNameIdx := strings.LastIndex(details.Name(), ".")
	funcName := details.Name()[funcNameIdx+1:]
	// iter the options
	for _, val := range options {
		t.Run(val.name, func(t *testing.T) {
			store, do := CreateMockStoreAndDomainWithSchemaLease(t, lease, opts...)
			tk := NewTestKit(t, store)
			val.opt(tk)
			testFunc(t, tk, do, val.name, funcName)
		})
	}
}

// CreateMockStore return a new mock kv.Storage.
func CreateMockStore(t testing.TB, opts ...mockstore.MockTiKVStoreOption) kv.Storage {
	if *WithTiKV != "" {
		var d driver.TiKVDriver
		var err error
		store, err := d.Open("tikv://" + *WithTiKV)
		require.NoError(t, err)
		config.GetGlobalConfig().Store = config.StoreTypeTiKV
		require.NoError(t, ddl.StartOwnerManager(context.Background(), store))
		var dom *domain.Domain
		dom, err = session.BootstrapSession(store)
		t.Cleanup(func() {
			dom.Close()
			ddl.CloseOwnerManager()
			err := store.Close()
			require.NoError(t, err)
			view.Stop()
		})
		require.NoError(t, err)
		return store
	}
	t.Cleanup(func() {
		view.Stop()
	})
	gctuner.GlobalMemoryLimitTuner.Stop()
	tryMakeImage()
	store, _ := CreateMockStoreAndDomain(t, opts...)
	_ = store.(helper.Storage)
	return store
}

// tryMakeImage tries to create a bootstraped storage, the store is used as image for testing later.
func tryMakeImage(opts ...mockstore.MockTiKVStoreOption) {
	if mockstore.ImageAvailable() {
		return
	}
	retry, err := false, error(nil)
	for err == nil {
		retry, err = tryMakeImageOnce()
		if !retry {
			break
		}
		time.Sleep(100 * time.Millisecond)
	}
}

func tryMakeImageOnce() (retry bool, err error) {
	const lockFile = "/tmp/tidb-unistore-bootstraped-image-lock-file"
	lock, err := os.Create(lockFile)
	if err != nil {
		return true, nil
	}
	defer func() { err = os.Remove(lockFile) }()
	defer lock.Close()

	// Prevent other process from creating the image concurrently
	err = syscall.Flock(int(lock.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
	if err != nil {
		return true, nil
	}
	defer func() { err = syscall.Flock(int(lock.Fd()), syscall.LOCK_UN) }()

	// Now this is the only instance to do the operation.
	store, err := mockstore.NewMockStore(
		mockstore.WithStoreType(mockstore.EmbedUnistore),
		mockstore.WithPath(mockstore.ImageFilePath))
	if err != nil {
		return false, err
	}

	session.SetSchemaLease(500 * time.Millisecond)
	session.DisableStats4Test()
	domain.DisablePlanReplayerBackgroundJob4Test()
	domain.DisableDumpHistoricalStats4Test()
	dom, err := session.BootstrapSession(store)
	if err != nil {
		return false, err
	}
	dom.SetStatsUpdating(true)

	dom.Close()
	err = store.Close()

	return false, err
}

// DistExecutionContext is the context
// that used in Distributed execution test for Dist task framework and DDL.
// TODO remove it after we can start multiple DDL job scheduler separately.
type DistExecutionContext struct {
	Store          kv.Storage
	domains        []*domain.Domain
	deletedDomains []*domain.Domain
	t              testing.TB
	mu             sync.Mutex
}

// TriggerOwnerChange set one mock domain to DDL Owner by idx.
func (d *DistExecutionContext) TriggerOwnerChange() {
	d.mu.Lock()
	defer d.mu.Unlock()
	for _, dom := range d.domains {
		om := dom.DDL().OwnerManager()
		if om.IsOwner() {
			_ = om.ResignOwner(nil)
			break
		}
	}
}

// Close cleanup running goroutines, release resources used.
func (d *DistExecutionContext) Close() {
	d.t.Cleanup(func() {
		d.mu.Lock()
		defer d.mu.Unlock()
		gctuner.GlobalMemoryLimitTuner.Stop()

		var wg tidbutil.WaitGroupWrapper
		for _, dom := range d.deletedDomains {
			wg.Run(dom.Close)
		}

		for _, dom := range d.domains {
			wg.Run(dom.Close)
		}

		wg.Wait()
		err := d.Store.Close()
		require.NoError(d.t, err)
	})
}

// GetDomain get domain by index.
func (d *DistExecutionContext) GetDomain(idx int) *domain.Domain {
	return d.domains[idx]
}

// NewDistExecutionContext create DistExecutionContext for testing.
func NewDistExecutionContext(t testing.TB, serverNum int) *DistExecutionContext {
	return NewDistExecutionContextWithLease(t, serverNum, 500*time.Millisecond)
}

// NewDistExecutionContextWithLease create DistExecutionContext for testing.
func NewDistExecutionContextWithLease(t testing.TB, serverNum int, lease time.Duration) *DistExecutionContext {
	store, err := mockstore.NewMockStore()
	require.NoError(t, err)
	gctuner.GlobalMemoryLimitTuner.Stop()
	domains := make([]*domain.Domain, 0, serverNum)
	sm := MockSessionManager{}

	domInfo := make([]string, 0, serverNum)
	for i := range serverNum {
		dom := bootstrap4DistExecution(t, store, lease)
		if i != serverNum-1 {
			dom.SetOnClose(func() { /* don't delete the store in domain map */ })
		}
		domains = append(domains, dom)
		domains[i].InfoSyncer().SetSessionManager(&sm)
		domInfo = append(domInfo, dom.DDL().GetID())
	}
	logutil.BgLogger().Info("domain DDL IDs", zap.Strings("IDs", domInfo))

	res := DistExecutionContext{
		schematracker.UnwrapStorage(store), domains, []*domain.Domain{}, t, sync.Mutex{}}
	return &res
}

// CreateMockStoreAndDomain return a new mock kv.Storage and *domain.Domain.
func CreateMockStoreAndDomain(t testing.TB, opts ...mockstore.MockTiKVStoreOption) (kv.Storage, *domain.Domain) {
	if kerneltype.IsNextGen() {
		updateConfigForNextgen(t)
	}
	store, err := mockstore.NewMockStore(opts...)
	require.NoError(t, err)
	dom := bootstrap(t, store, 500*time.Millisecond)
	sm := MockSessionManager{}
	dom.InfoSyncer().SetSessionManager(&sm)
	t.Cleanup(func() {
		view.Stop()
		gctuner.GlobalMemoryLimitTuner.Stop()
	})
	store = schematracker.UnwrapStorage(store)
	_ = store.(helper.Storage)
	return store, dom
}

var keyspaceIDAlloc atomic.Int32

// CreateMockStoreAndDomainForKS return a new mock kv.Storage and *domain.Domain
// some keyspace.
// this function is mainly for cross keyspace test, so normally you should use
// CreateMockStoreAndDomain instead,
func CreateMockStoreAndDomainForKS(t testing.TB, ks string, opts ...mockstore.MockTiKVStoreOption) (kv.Storage, *domain.Domain) {
	intest.Assert(kerneltype.IsNextGen(), "CreateMockStoreAndDomainForKS should only be used in nextgen kernel tests")

	bak := *config.GetGlobalConfig()
	t.Cleanup(func() {
		config.StoreGlobalConfig(&bak)
	})
	config.UpdateGlobal(func(conf *config.Config) {
		conf.KeyspaceName = ks
	})

	sysKSOpt := mockstore.WithKeyspaceMeta(&keyspacepb.KeyspaceMeta{
		Id:   uint32(0xFFFFFF) - 1,
		Name: keyspace.System,
	})
	ksOpt := sysKSOpt
	if ks != keyspace.System {
		ksOpt = mockstore.WithKeyspaceMeta(&keyspacepb.KeyspaceMeta{
			Id:   uint32(keyspaceIDAlloc.Add(1)),
			Name: ks,
		})
	}

	ksOpts := append(opts, ksOpt)
	store, dom := CreateMockStoreAndDomain(t, ksOpts...)
	if ks == keyspace.System {
		kvstore.SetSystemStorage(store)
	}
	return store, dom
}

func bootstrap4DistExecution(t testing.TB, store kv.Storage, lease time.Duration) *domain.Domain {
	session.SetSchemaLease(lease)
	session.DisableStats4Test()
	domain.DisablePlanReplayerBackgroundJob4Test()
	domain.DisableDumpHistoricalStats4Test()
	dom, err := session.BootstrapSession4DistExecution(store)
	require.NoError(t, err)

	dom.SetStatsUpdating(true)
	return dom
}

func bootstrap(t testing.TB, store kv.Storage, lease time.Duration) *domain.Domain {
	session.SetSchemaLease(lease)
	session.DisableStats4Test()
	domain.DisablePlanReplayerBackgroundJob4Test()
	domain.DisableDumpHistoricalStats4Test()
	dom, err := session.BootstrapSession(store)
	require.NoError(t, err)

	dom.SetStatsUpdating(true)

	t.Cleanup(func() {
		dom.Close()
		view.Stop()
		err := store.Close()
		require.NoError(t, err)
		resourcemanager.InstanceResourceManager.Reset()
	})
	return dom
}

// CreateMockStoreWithSchemaLease return a new mock kv.Storage.
func CreateMockStoreWithSchemaLease(t testing.TB, lease time.Duration, opts ...mockstore.MockTiKVStoreOption) kv.Storage {
	if kerneltype.IsNextGen() {
		updateConfigForNextgen(t)
	}
	store, _ := CreateMockStoreAndDomainWithSchemaLease(t, lease, opts...)
	return schematracker.UnwrapStorage(store)
}

// CreateMockStoreAndDomainWithSchemaLease return a new mock kv.Storage and *domain.Domain.
func CreateMockStoreAndDomainWithSchemaLease(t testing.TB, lease time.Duration, opts ...mockstore.MockTiKVStoreOption) (kv.Storage, *domain.Domain) {
	store, err := mockstore.NewMockStore(opts...)
	require.NoError(t, err)
	dom := bootstrap(t, store, lease)
	sm := MockSessionManager{}
	dom.InfoSyncer().SetSessionManager(&sm)
	return schematracker.UnwrapStorage(store), dom
}

// SetTiFlashReplica is to set TiFlash replica
func SetTiFlashReplica(t testing.TB, dom *domain.Domain, dbName, tableName string) {
	is := dom.InfoSchema()
	tblInfo, err := is.TableByName(context.Background(), ast.NewCIStr(dbName), ast.NewCIStr(tableName))
	require.NoError(t, err)
	tblInfo.Meta().TiFlashReplica = &model.TiFlashReplicaInfo{
		Count:     1,
		Available: true,
	}
}

func updateConfigForNextgen(t testing.TB) {
	t.Helper()
	// in nextgen, SYSTEM ks must be bootstrapped first, to make UT easier, we
	// always run them inside SYSTEM keyspace, if your test requires bootstrapping
	// multiple keyspace, you should use CreateMockStoreAndDomainForKS instead.
	bak := *config.GetGlobalConfig()
	t.Cleanup(func() {
		config.StoreGlobalConfig(&bak)
	})
	config.UpdateGlobal(func(conf *config.Config) {
		conf.KeyspaceName = keyspace.System
	})
}
